-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: transactions #323
base: master
Are you sure you want to change the base?
feat: transactions #323
Conversation
@FlorentinDUBOIS Tagging you for visibility. I know this is a lot so no rush here! |
Bumping and tagging some additional folks - @freeznet @BewareMyPower |
I'm going to review this PR soon. |
Thanks you @rkrishn7, I will take a look at this pull request and test it in production next week. Sorry for the delay for my answer |
current_backoff = std::cmp::min( | ||
Self::OP_MAX_BACKOFF * 2u32.saturating_pow(current_retries), | ||
Self::OP_MAX_BACKOFF, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use cmp::min
here? OP_MAX_BACKOFF
should always be the smaller one. It looks like you should use OP_MIN_BACKOFF * 2u32.saturating_pow(current_retries)
.
BTW, could you reuse the operation_retry_parameters
in Pulsar::new
instead of hard coding backoff parameters? And it would be better to abstract a Backoff
class like Java so that we can reuse the logic in connect_inner
rather than rewriting the same logic again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use
cmp::min
here?OP_MAX_BACKOFF
should always be the smaller one. It looks like you should useOP_MIN_BACKOFF * 2u32.saturating_pow(current_retries)
.
Thanks for catching that! Updated to use OP_MIN_BACKOFF
there.
BTW, could you reuse the
operation_retry_parameters
inPulsar::new
instead of hard coding backoff parameters?
The same operation_retry_parameters
as declared in Pulsar::new
should already be getting used here because these operations rely on ConnectionSender::send_message
.
The retry logic here is solely for TransactionCoordinatorNotFound
errors. I think it's possible to make the backoff parameters here configurable, but they should be different from operation_retry_parameters
.
And it would be better to abstract a
Backoff
class like Java so that we can reuse the logic inconnect_inner
rather than rewriting the same logic again
Agreed, but I think maybe we can make another issue for this? May not be a great idea to increase the scope of this PR since it's already quite large.
I don't think so.
Sorry I don't get it. It only registers the partition with the transaction id via the |
👍🏾
Sorry! To clarify, I think there's a potential footgun lurking here. For example: Let's say we've enabled transactions and start producing transactional messages with a batching producer. If we haven't hit the batching threshold before the transaction's timeout, then all those buffered messages will fail to be produced. This is subtle, but definitely seems undesirable. It seems like either transactional messages should bypass batching or we should warn the user if both transactions and batching are enabled. |
Hi @BewareMyPower @FlorentinDUBOIS Just bumping this PR! Would love to get this in soon 😄 |
I am on holiday until the 10 of February, I will be able to test after. |
This PR adds client support for Pulsar Transactions.
Resolves #253
Notes:
Open Questions:
Transaction
API uses internal synchronization to provide a "no mut" public interface. Are there any alternative approaches that may be better?